{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sc = SparkContext(conf=SparkConf())\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Logistic regression with pyspark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+---------+---------+---+\n",
      "|age|education|wantsMore|  y|\n",
      "+---+---------+---------+---+\n",
      "|<25|      low|      yes|  0|\n",
      "|<25|      low|      yes|  0|\n",
      "|<25|      low|      yes|  0|\n",
      "|<25|      low|      yes|  0|\n",
      "|<25|      low|      yes|  0|\n",
      "+---+---------+---------+---+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cuse = spark.read.csv('data/cuse_binary.csv', header=True, inferSchema=True)\n",
    "cuse.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Process categorical columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following code does three things with pipeline:\n",
    "\n",
    "* **`StringIndexer`** all categorical columns\n",
    "* **`OneHotEncoder`** all categorical index columns\n",
    "* **`VectorAssembler`** all feature columns into one vector column"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Categorical columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler\n",
    "from pyspark.ml import Pipeline\n",
    "\n",
    "# categorical columns\n",
    "categorical_columns = cuse.columns[0:3]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build StringIndexer stages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]\n",
    "# encode label column and add it to stringindexer_stages\n",
    "stringindexer_stages += [StringIndexer(inputCol='y', outputCol='label')]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build OneHotEncoder stages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build VectorAssembler stage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "feature_columns = ['onehot_' + c for c in categorical_columns]\n",
    "vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features') "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build pipeline model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# all stages\n",
    "all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]\n",
    "pipeline = Pipeline(stages=all_stages)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Fit pipeline model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "pipeline_model = pipeline.fit(cuse)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Transform data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------+----------------+----------------+-------------------+-----+\n",
      "|   onehot_age|onehot_education|onehot_wantsMore|           features|label|\n",
      "+-------------+----------------+----------------+-------------------+-----+\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "+-------------+----------------+----------------+-------------------+-----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "final_columns = feature_columns + ['features', 'label']\n",
    "cuse_df = pipeline_model.transform(cuse).\\\n",
    "            select(final_columns)\n",
    "            \n",
    "cuse_df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Split data into training and test datasets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "training, test = cuse_df.randomSplit([0.8, 0.2], seed=1234)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build cross-validation model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Estimator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression\n",
    "logr = LogisticRegression(featuresCol='features', labelCol='label')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Parameter grid"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.tuning import ParamGridBuilder\n",
    "param_grid = ParamGridBuilder().\\\n",
    "    addGrid(logr.regParam, [0, 0.5, 1, 2]).\\\n",
    "    addGrid(logr.elasticNetParam, [0, 0.5, 1]).\\\n",
    "    build()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Evaluator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
    "evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cross-validation model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.tuning import CrossValidator\n",
    "cv = CrossValidator(estimator=logr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Fit cross-validation model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cv_model = cv.fit(cuse_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Prediction"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Prediction on training data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+----------+------------------------------------------+---------------------------------------+\n",
      "|features |label|prediction|rawPrediction                             |probability                            |\n",
      "+---------+-----+----------+------------------------------------------+---------------------------------------+\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "+---------+-----+----------+------------------------------------------+---------------------------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pred_training_cv = cv_model.transform(training)\n",
    "pred_training_cv.select(show_columns).show(5, truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Prediction on test data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+----------+------------------------------------------+---------------------------------------+\n",
      "|features |label|prediction|rawPrediction                             |probability                            |\n",
      "+---------+-----+----------+------------------------------------------+---------------------------------------+\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|\n",
      "+---------+-----+----------+------------------------------------------+---------------------------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pred_test_cv = cv_model.transform(test)\n",
    "pred_test_cv.select(show_columns).show(5, truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Intercept and coefficients of the regression model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Intercept: 0.05602431718564116\n",
      "coefficients: [-0.280625539774,-0.799857435517,-1.18923909827,0.324994746147,-0.832954766261]\n"
     ]
    }
   ],
   "source": [
    "print('Intercept: ' + str(cv_model.bestModel.intercept) + \"\\n\"\n",
    "     'coefficients: ' + str(cv_model.bestModel.coefficients))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parameters from the best model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The best RegParam is:  0.0 \n",
      " The best ElasticNetParam is: cv_model.bestModel._java_obj.getElasticNetParam()\n"
     ]
    }
   ],
   "source": [
    "print('The best RegParam is: ', cv_model.bestModel._java_obj.getRegParam(), \"\\n\",\n",
    "     'The best ElasticNetParam is: cv_model.bestModel._java_obj.getElasticNetParam()')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Linear regression with R"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import Data (below is R code!)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "#====== This is R code! =========\n",
    "cuse = read.table('http://data.princeton.edu/wws509/datasets/cuse.dat', header = T)\n",
    "\n",
    "# convert count data to binary data\n",
    "not_using = rep(1:nrow(cuse), times=cuse$notUsing)\n",
    "using = rep(1:nrow(cuse), times=cuse$using)\n",
    "cuse_binary = cuse[c(not_using, using), 1:3]\n",
    "cuse_binary$y = c(rep(0, length(not_using)), rep(1, length(using)))\n",
    "\n",
    "# write data into a file\n",
    "write.csv(cuse_binary, file='data/cuse_binary.csv', row.names = FALSE)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Process categorical variables \n",
    "Process categorical variables so they have the same pattern as in pyspar. Element levels are in the descending order of element frequencies."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "#====== This is R code! =========\n",
    "cuse_binary$age = factor(cuse_binary$age, \n",
    "                         levels = names(sort(table(cuse_binary$age), decreasing = TRUE)))\n",
    "cuse_binary$education = factor(cuse_binary$education,\n",
    "                               levels = names(sort(table(cuse_binary$education), decreasing = TRUE)))\n",
    "cuse_binary$wantsMore = factor(cuse_binary$wantsMore,\n",
    "                               levels = names(sort(table(cuse_binary$wantsMore), decreasing = TRUE)))\n",
    "\n",
    "# encode label column\n",
    "cuse_binary$y = factor(cuse_binary$y,\n",
    "                               levels = names(sort(table(cuse_binary$y))))\n",
    "glm_cuse = glm(y~age + education + wantsMore, data = cuse_binary, family = binomial(link = \"logit\"))\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Results"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "#====== This is R code! =========\n",
    "glm_cuse$coefficients\n",
    "\n",
    " (Intercept)     age25-29       age<25     age40-49 educationlow  wantsMoreno \n",
    "   0.7325613    0.5192319    0.9086135   -0.2806254    0.3249947   -0.8329548 \n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
